0%

Flink Source Code

42674d258e3ba907d1d288d8f3da38b.jpg

菜逼的Apache源码阅读之路

Flink 源码阅读

前言

很久之前我就在想怎么阅读源码,大佬的回答总是出奇的一致

“耐心”

哈哈,仿佛有了耐心就是万能的

于是菜逼那会去Github Clone了一份源码,结果在第一步就卡了好久,笑,在第一步学会了使用Git设置代理,学会了去官方文档寻找编译需要的环境(环境真的坑死人),需要合适的JDK版本,需要合适的Mvn版本,否则你是永远不可能编译成功的

涉及UI的Apache项目一般来说还会依赖很多东西,比如说Node Js之类的,他们会在POM中的自动脚本插件里面附上一些bash操作,对于不同的电脑环境来说未必是能成功的,里面涉及到一些Mvn项目的改动,那段时间确切得感受到了Mac的便利

终于把基本环境搞定了,接下来一个大型Apache项目中会有大量的代码,从哪阅读,怎么阅读也是个问题,首先要懂一点设计模式对吧,不然光看那些不知所以然的解耦就不知道写代码的人在做什么,满脸懵逼而已,核心的代码,看着26个英文字母都懂,组合在一起就不知道这些代码在干锤子了

后来又刷了一波LeeCode,终于明白了人和人的差距,连LeeCode这么简单的算法,都有各种巧妙的实现方式,有时候配上注释都要看半天,更别说各种精妙的算法在工程里面的实现了,释然了,这事情只能慢慢去补,如果是梳理框架,使用的话,先从架构层面,理解源码,每个东西在代码中的作用先了然,再谈实现方式,我尝试着尽量不在细节上面纠结,这样阅读代码的收获和成就感会强得多,才能支持自己读下去,书和博客都是梳理架构的好途径

只要能坚持,总能从优秀的Apache 代码里面学到越来越多的东西,理解应该也会越来越多,上面的是截止到目前为止,菜逼的一些简单感悟,后面如果有新的看法会过来更新
剩下需要的就是耐心和时间 :)

源码阅读入口

Flink提供了系列shell脚本用于flink集群管理、job提交等,通过分析这些脚本找到自己所关心的核心链路入口是比较合适的。

Flink提供了两个启动脚本:bin/start-local.sh 用于启动单机模式的Flink;bin/start-cluster.sh 用于启动集群模式的Flink。

(1)start-local.sh

(2)start-cluster.sh

  • 解析提取flink-yaml.xml中的配置项。
  • 通过ssh远程启动各master机器上的jobmaster进程(需要在conf/masters中配置master机器的ip地址,默认是localhost:8081)。
  • 启动taskmanager进程(需要在conf/slaves配置slave机器的ip地址,通常是localhost)。

由flink-daemon.sh可知,Flink中各主要进程的入口对应关系如下:

jobmanager org.apache.flink.runtime.jobmanager.JobManager
taskmanager org.apache.flink.runtime.taskmanager.TaskManager
内置zookeeper org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
historyserver org.apache.flink.runtime.webmonitor.history.HistoryServer

Flink提供的CLI脚本是bin/flink,可以通过该脚本提交Job、创建Savepoint等。

脚本的主要流程:

  • 解析提取flink-yaml.xml中的配置项。
  • 通过Client入口org.apache.flink.client.CliFrontend连接到JobManager并发送消息。

DUBUG主要流程:

将源码导入到IDE中(如IDEA),本地debug基本方法如下:

1、在jvm启动参数中添加远程调试参数

(1)如果是调试Client,可以将上述参数加到bin/flink脚本的最后一行中,形如:
JVM_REMOTE_DEBUG_ARGS=’-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005’
exec $JAVA_RUN $JVM_ARGS $JVM_REMOTE_DEBUG_ARGS “${log_setting[@]}” -classpath “manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"“ org.apache.flink.client.CliFrontend “$@”
(2)如果是调试JobManager或TaskManager,可以在conf/flink-conf.yaml中添加:

env.java.opts: -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006

2、启动flink client或jobmanager或taskmanager,此时程序会suspend等待debuger连接(通过suspend=y来配置)。

3、配置IDEA中的remote:host配置为localhost,配置port(参考1中的配置的address端口)。

4、在Flink源码中设置断点,连接远程host,然后就可以开始debug跟踪了。

Flink Annotations

flink-annotations模块定义了一些flink项目中需要用到的注解。Java注解是附加在代码中的一些元信息,用于一些工具在编译、运行时进行解析和使用,起到说明、配置的功能。该模块主要包括的注解类型有:

undefined

flink annotations下包含了docs相关的三种注解:ConfigGroup,ConfigGroups和Documentation。然后还有其他5种注解:Experimental,Internal, Public,PublicEnvolving和VisableForTesting。下面分别简单地介绍下这些注解的作用。

docs相关的三个注解

ConfigGroup

1
2
3
4
5
6
@Target({})
@Internal
public @interface ConfigGroup {
String name();
String keyPrefix();
}

这个注解的作用是指定一组配置选项的类。该组的name将被用作生成的HTML文件的文件名。

ConfigGroups

1
2
3
4
5
6
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Internal
public @interface ConfigGroups {
ConfigGroup[] groups() default {};
}

这个注解是提供了一种根据key的最大前缀来把配置选项拆分为不同的组。

Documentation

这个类主要是修改文档生成器的行为的注解集合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public final class Documentation {

// 用于配置选项字段的以重写已记录的默认值
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Internal
public @interface OverrideDefault {
String value();
}

// 用于配置选项字段的注释,以便将它们包括在“公共选项”部分中。
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Internal
public @interface CommonOption {
int POSITION_MEMORY = 10;
int POSITION_PARALLELISM_SLOTS = 20;
int POSITION_FAULT_TOLERANCE = 30;
int POSITION_HIGH_AVAILABILITY = 40;
int POSITION_SECURITY = 50;

int position() default Integer.MAX_VALUE;
}

// 在配置选项字段上使用的注释,以从文档中排除配置选项。
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Internal
public @interface ExcludeFromDocumentation {

String value() default "";
}

private Documentation(){
}
}

Experimental

1
2
3
4
5
@Documented
@Target(ElementType.TYPE)
@Public
public @interface Experimental {
}

标注类为实验阶段。带有此注释的类既没有经过严格的测试,也还不稳定,并且可以更改或删除在未来版本中。

Internal

1
2
3
4
5
@Documented
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.CONSTRUCTOR })
@Public
public @interface Internal {
}

该注解用于将稳定的公共API中的方法标记为内部开发人员API。开发人员API是稳定的,但仅仅是在Flink内部,但是在发布版本有可能有些变化。

Public

1
2
3
4
@Documented
@Target(ElementType.TYPE)
@Public
public @interface Public {}

标注类为开放和稳定的。类,方法或者属性被这个这个注解修饰时,表示在小版本迭代中,都维持稳定。

PublicEvolving

1
2
3
4
5
@Documented
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.CONSTRUCTOR })
@Public
public @interface PublicEvolving {
}

该注解用来标注公共的但有不断发展的接口依赖的类或者方法。带有此注释的类和方法用于公共使用,并且具有稳定的行为。但是,它们的接口和签名不被认为是稳定的,并且当跨版本时可能会变化。

VisibleForTesting

1
2
3
4
@Documented
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.CONSTRUCTOR })
@Internal
public @interface VisibleForTesting {}

这个注解申明有些函数,属性,构造函数或整个类型值是在test时才是可见的。当例如方法应该是编码阶段,通常附加这个注释(因为它不打算在外部调用),但不能声明为私有,因为一些测试需要访问它。

ParameterTool

先从简单的看起,这个是Flink自带的工具类, 位置:

73E97217-CF9B-4d73-A33A-5AD2140FA487.png

结构:

未命名1603941927.png

ParameterTool 里面的可以从多种数据来源创建

比如方法:

fromPropertiesFile

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Returns {@link ParameterTool} for the given {@link Properties} file.
*
* @param path Path to the properties file
* @return A {@link ParameterTool}
* @throws IOException If the file does not exist
* @see Properties
*/
public static ParameterTool fromPropertiesFile(String path) throws IOException {
File propertiesFile = new File(path);
return fromPropertiesFile(propertiesFile);
}

可以看到下面调用了工具类里面的另外一个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Returns {@link ParameterTool} for the given {@link Properties} file.
*
* @param file File object to the properties file
* @return A {@link ParameterTool}
* @throws IOException If the file does not exist
* @see Properties
*/
public static ParameterTool fromPropertiesFile(File file) throws IOException {
if (!file.exists()) {
throw new FileNotFoundException("Properties file " + file.getAbsolutePath() + " does not exist");
}
try (FileInputStream fis = new FileInputStream(file)) {
return fromPropertiesFile(fis);
}
}

继续向下套娃

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Returns {@link ParameterTool} for the given InputStream from {@link Properties} file.
*
* @param inputStream InputStream from the properties file
* @return A {@link ParameterTool}
* @throws IOException If the file does not exist
* @see Properties
*/
public static ParameterTool fromPropertiesFile(InputStream inputStream) throws IOException {
Properties props = new Properties();
props.load(inputStream);
return fromMap((Map) props);
}
1
2
3
4
5
6
7
8
9
10
/**
* Returns {@link ParameterTool} for the given map.
*
* @param map A map of arguments. Both Key and Value have to be Strings
* @return A {@link ParameterTool}
*/
public static ParameterTool fromMap(Map<String, String> map) {
Preconditions.checkNotNull(map, "Unable to initialize from empty map");
return new ParameterTool(map);
}

这边调用了ParameterTool的构造器,然后窜入了参数 最终构造出了这个 ParameterTool

1
2
3
4
5
6
7
8
9
private ParameterTool(Map<String, String> data) {
this.data = Collections.unmodifiableMap(new HashMap<>(data));

this.defaultData = new ConcurrentHashMap<>(data.size());

this.unrequestedParameters = Collections.newSetFromMap(new ConcurrentHashMap<>(data.size()));

unrequestedParameters.addAll(data.keySet());
}

在这个过程里面,inputStream 转化成了 Map<String,String>直到最后的传入。

工具类的源码比较简单,代码也比较工整,很容易看懂。

FromArgs用法:

1
2
3
4
ParameterTool parameters = ParameterTool.fromArgs(args);
String local_path = parameters.get("local_path",null); //指定参数名:local_path
//读取配置文件
ParameterTool paramFromProps = ParameterTool.fromPropertiesFile(local_path);

代码打包完成jar,在服务器启动Flink任务时,需要添加自定义参数local_path,指定配置文件的绝对路径。比如:

1
flink run flink.jar -local_path ./config.properties

Reference

开源发布的版本号:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Alpha、Beta、Gamma版本

以下三者,在系统、架包、软件的开发过程中,使用。
Alpha:内测版,BUG多,开发人员开发过程中使用,希腊字母α,第一,指最初版
Beta:早期版本,有缺陷,无大BUG,可能加入新功能,进一步开发完善。
Gamma: 经beta 版,完善修改,成为正式发布的候选版本(Release Candidate)

RC、GA、R版本
RC:(Release Candidate):候选版本,几乎就是正式版了,
GA:(Ggeneral Availability):发行稳定版,官方推荐使用此版本。
R,RELEASE:正式版,等价于GA

SNAPSHOT版本
SNAPSHOT:快照版,可以稳定使用,且仍在继续改进版本。

snapshot多见于架包依赖中,使用Maven时,需要的架包回去仓库Nexus中找,,一个仓库一般分为RELEASE仓和SNAPSHOT仓快照版,前者理解,稳定正式版本,后者如何?Maven在构建项目时,会优先去远程仓库中查看是否有最新的example-1.0-SNAPSHOT.jar,如果有则下载下来使用。即使本地仓库中已经有了example-1.0-SNAPSHOT.jar,它也会尝试去远程仓库中查看同名的jar是否是最新的。

有的人可能会问,这样不就不能充分利用本地仓库的缓存机制了吗?别着急,Maven比我们想象中的要聪明。在配置Maven的Repository的时候中有个配置项,可以配置对于SNAPSHOT版本向远程仓库中查找的频率。频率共有四种,分别是always、daily、interval、never。当本地仓库中存在需要的依赖项目时,always是每次都去远程仓库查看是否有更新,daily是只在第一次的时候查看是否有更新,当天的其它时候则不会查看;interval允许设置一个分钟为单位的间隔时间,在这个间隔时间内只会去远程仓库中查找一次,never是不会去远程仓库中查找(这种就和正式版本的行为一样了)。

其他版本
Alpha:内部测试版
Beta:外部测试版
Build:修正版
Corporation或Enterprise:企业版
Delux:豪华版
DEMO:演示版,有功能限制
Free:免费版
Full:完全版
Final:正式版
Pro(professional):专业版
Plus:加强版
Retail:零售版
Release:发行版,有时间限制
Shareware:共享版,虽然不会要求注册但是一般也有功能限制
SR:修正版
Trial:试用版(一般有时间或者功能限制)